/*
 * Copyright (C) 2014 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A list of listeners for implementing a concurrency friendly observable object.
 *
 * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain
 * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events.
 *
 * <p>The API of this class is designed to make it easy to achieve the following properties
 *
 * <ul>
 *   <li>Multiple events for the same listener are never dispatched concurrently.
 *   <li>Events for the different listeners are dispatched concurrently.
 *   <li>All events for a given listener dispatch on the provided {@link #executor}.
 *   <li>It is easy for the user to ensure that listeners are never invoked while holding locks.
 * </ul>
 * <p>
 * The last point is subtle. Often the observable object will be managing its own internal state
 * using a lock, however it is dangerous to dispatch listeners while holding a lock because they
 * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your
 * object). So it is important to not call {@link #dispatch} while holding any locks. This is why
 * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision
 * to run a particular event is made during the state change, but the decision to actually invoke
 * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link
 * #dispatch} is expected to be called concurrently, it is idempotent.
 */
@GwtIncompatible
@ElementTypesAreNonnullByDefault
final class ListenerCallQueue<L>
{
    // TODO(cpovirk): consider using the logger associated with listener.getClass().
    private static final Logger logger = Logger.getLogger(ListenerCallQueue.class.getName());

    // TODO(chrisn): promote AppendOnlyCollection for use here.
    private final List<PerListenerQueue<L>> listeners =
            Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>());

    /**
     * Method reference-compatible listener event.
     */
    interface Event<L>
    {
        /**
         * Call a method on the listener.
         */
        void call(L listener);
    }

    /**
     * Adds a listener that will be called using the given executor when events are later {@link
     * #enqueue enqueued} and {@link #dispatch dispatched}.
     */
    public void addListener(L listener, Executor executor)
    {
        checkNotNull(listener, "listener");
        checkNotNull(executor, "executor");
        listeners.add(new PerListenerQueue<>(listener, executor));
    }

    /**
     * Enqueues an event to be run on currently known listeners.
     *
     * <p>The {@code toString} method of the Event itself will be used to describe the event in the
     * case of an error.
     *
     * @param event the callback to execute on {@link #dispatch}
     */
    public void enqueue(Event<L> event)
    {
        enqueueHelper(event, event);
    }

    /**
     * Enqueues an event to be run on currently known listeners, with a label.
     *
     * @param event the callback to execute on {@link #dispatch}
     * @param label a description of the event to use in the case of an error
     */
    public void enqueue(Event<L> event, String label)
    {
        enqueueHelper(event, label);
    }

    private void enqueueHelper(Event<L> event, Object label)
    {
        checkNotNull(event, "event");
        checkNotNull(label, "label");
        synchronized (listeners)
        {
            for (PerListenerQueue<L> queue : listeners)
            {
                queue.add(event, label);
            }
        }
    }

    /**
     * Dispatches all events enqueued prior to this call, serially and in order, for every listener.
     *
     * <p>Note: this method is idempotent and safe to call from any thread
     */
    public void dispatch()
    {
        // iterate by index to avoid concurrent modification exceptions
        for (int i = 0; i < listeners.size(); i++)
        {
            listeners.get(i).dispatch();
        }
    }

    /**
     * A special purpose queue/executor that dispatches listener events serially on a configured
     * executor. Each event can be added and dispatched as separate phases.
     *
     * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can
     * be added without necessarily executing immediately.
     */
    private static final class PerListenerQueue<L> implements Runnable
    {
        final L listener;
        final Executor executor;

        @GuardedBy("this")
        final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque();

        @GuardedBy("this")
        final Queue<Object> labelQueue = Queues.newArrayDeque();

        @GuardedBy("this")
        boolean isThreadScheduled;

        PerListenerQueue(L listener, Executor executor)
        {
            this.listener = checkNotNull(listener);
            this.executor = checkNotNull(executor);
        }

        /**
         * Enqueues a event to be run.
         */
        synchronized void add(ListenerCallQueue.Event<L> event, Object label)
        {
            waitQueue.add(event);
            labelQueue.add(label);
        }

        /**
         * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in
         * order.
         */
        void dispatch()
        {
            boolean scheduleEventRunner = false;
            synchronized (this)
            {
                if (!isThreadScheduled)
                {
                    isThreadScheduled = true;
                    scheduleEventRunner = true;
                }
            }
            if (scheduleEventRunner)
            {
                try
                {
                    executor.execute(this);
                }
                catch (RuntimeException e)
                {
                    // reset state in case of an error so that later dispatch calls will actually do something
                    synchronized (this)
                    {
                        isThreadScheduled = false;
                    }
                    // Log it and keep going.
                    logger.log(
                            Level.SEVERE,
                            "Exception while running callbacks for " + listener + " on " + executor,
                            e);
                    throw e;
                }
            }
        }

        @Override
        public void run()
        {
            boolean stillRunning = true;
            try
            {
                while (true)
                {
                    ListenerCallQueue.Event<L> nextToRun;
                    Object nextLabel;
                    synchronized (PerListenerQueue.this)
                    {
                        Preconditions.checkState(isThreadScheduled);
                        nextToRun = waitQueue.poll();
                        nextLabel = labelQueue.poll();
                        if (nextToRun == null)
                        {
                            isThreadScheduled = false;
                            stillRunning = false;
                            break;
                        }
                    }

                    // Always run while _not_ holding the lock, to avoid deadlocks.
                    try
                    {
                        nextToRun.call(listener);
                    }
                    catch (RuntimeException e)
                    {
                        // Log it and keep going.
                        logger.log(
                                Level.SEVERE,
                                "Exception while executing callback: " + listener + " " + nextLabel,
                                e);
                    }
                }
            }
            finally
            {
                if (stillRunning)
                {
                    // An Error is bubbling up. We should mark ourselves as no longer running. That way, if
                    // anyone tries to keep using us, we won't be corrupted.
                    synchronized (PerListenerQueue.this)
                    {
                        isThreadScheduled = false;
                    }
                }
            }
        }
    }
}
